8.2 通道
相比Erlang,Go并未实现严格的并发安全。
允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致和完整性。Go鼓励使用CSP通道,以通信来代替内存共享,实现并发安全。
Don’t communicate by sharing memory,share memory by communicating.
CSP:Communicating Sequential Process.
通过消息来避免竞态的模型除了CSP,还有Actor。但两者有较大区别。
作为CSP核心,通道(channel)是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。
相比起来,Actor是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。
从底层实现上来说,通道只是一个队列。同步模式下,发送和接收双方配对,然后直接复制数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入等待队列,直到有另一方写入数据或腾出空槽后被唤醒。
除传递消息(数据)外,通道还常被用作事件通知。
func main() { done:=make(chan struct{}) // 结束事件 c:=make(chan string) // 数据传输通道
go func() {
s:= ←c // 接收消息
println(s)
close(done) // 关闭通道,作为结束通知
}()
c← “hi!” // 发送消息 ←done // 阻塞,直到有数据或管道关闭 }
输出:
hi!
同步模式必须有配对操作的goroutine出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。
func main() { c:=make(chan int,3) // 创建带3个缓冲槽的异步通道
c←1 // 缓冲区未满,不会阻塞 c←2
println(←c) // 缓冲区尚有数据,不会阻塞 println(←c) }
输出:
1 2
多数时候,异步通道有助于提升性能,减少排队阻塞。
缓冲区大小仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil。
func main() { var a,b chan int=make(chan int,3),make(chan int) var c chan bool
println(ab) println(cnil)
fmt.Printf(“%p, %d\n”,a,unsafe.Sizeof(a)) }
输出:
false true 0xc820076000,8
虽然可传递指针来避免数据复制,但须额外注意数据并发安全。
内置函数cap和len返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0,据此可判断通道是同步还是异步。
func main() { a,b:=make(chan int),make(chan int,3)
b←1 b←2
println(“a:“,len(a),cap(a)) println(“b:“,len(b),cap(b)) }
输出:
a:0 0 b:2 3
收发
除使用简单的发送和接收操作符外,还可用ok-idom或range模式处理数据。
func main() { done:=make(chan struct{}) c:=make(chan int)
go func() { defer close(done) // 确保发出结束通知
for{
x,ok:= <-c
if!ok{ // 据此判断通道是否被关闭
return
}
println(x)
}
}()
c←1 c←2 c←3 close(c)
<-done
}
输出:
1 2 3
对于循环接收数据,range模式更简洁一些。
func main() { done:=make(chan struct{}) c:=make(chan int)
go func() { defer close(done)
for x:=range c{ // 循环获取消息,直到通道被关闭
println(x)
}
}()
c←1 c←2 c←3 close(c)
<-done
}
及时用close函数关闭通道引发结束通知,否则可能会导致死锁。
fatal error:all goroutines are asleep-deadlock!
通知可以是群体性的。也未必就是通知结束,可以是任何需要表达的事件。
func main() { var wg sync.WaitGroup ready:=make(chan struct{})
for i:=0;i<3;i++ { wg.Add(1)
go func(id int) {
defer wg.Done()
println(id, ":ready.") // 运动员准备就绪
<-ready // 等待发令
println(id, ":running...")
}(i)
}
time.Sleep(time.Second) println(“Ready?Go!”)
close(ready) // 砰!
wg.Wait() }
输出:
0:ready. 2:ready. 1:ready.
Ready?Go!
1:running… 0:running… 2:running…
一次性事件用close效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond实现单播或广播事件。
对于closed或nil通道,发送和接收操作都有相应规则:
- 向已关闭通道发送数据,引发panic。
- 从已关闭接收数据,返回已缓冲数据或零值。
- 无论收发,nil通道都会阻塞。
func main() { c:=make(chan int,3)
c←10 c←20 close(c)
for i:=0;i<cap(c)+1;i++ { x,ok:= ←c println(i, ”:“,ok,x) } }
输出:
0:true 10 1:true 20 2:false 0 3:false 0
重复关闭,或关闭nil通道都会引发panic错误。
panic:close of closed channel panic:close of nil channel
单向
通道默认是双向的,并不区分发送和接收端。但某些时候,我们可限制收发操作的方向来获得更严谨的操作逻辑。
尽管可用make创建单向通道,但那没有任何意义。通常使用类型转换来获取单向通道,并分别赋予操作双方。
func main() { var wg sync.WaitGroup wg.Add(2)
c:=make(chan int) var send chan←int=c var recv←chan int=c
go func() { defer wg.Done()
for x:=range recv{
println(x)
}
}()
go func() { defer wg.Done() defer close(c)
for i:=0;i<3;i++ {
send<-i
}
}()
wg.Wait() }
不能在单向通道上做逆向操作。
func main() { c:=make(chan int,2)
var send chan←int=c var recv←chan int=c
<-send // 无效操作: <-send(receive from send-only type chan<-int)
recv←1 // 无效操作:recv←1(send to receive-only type←chan int) }
同样,close不能用于接收端。
func main() { c:=make(chan int,2) var recv←chan int=c
close(recv) // 无效操作:close(recv) (cannot close receive-only channel) }
无法将单向通道重新转换回去。
func main() { var a,b chan int
a=make(chan int,2) var recv←chan int=a var send chan←int=a
b= (chan int)(recv) // 错误:cannot convert recv(type←chan int)to type chan int b= (chan int)(send) // 错误:cannot convert send(type chan←int)to type chan int }
选择
如要同时处理多个通道,可选用select语句。它会随机选择一个可用通道做收发操作。
func main() { var wg sync.WaitGroup wg.Add(2)
a,b:=make(chan int),make(chan int)
go func() { // 接收端 defer wg.Done()
for{
var(
name string
x int
ok bool
)
select{ // 随机选择可用channel接收数据
case x,ok= <-a:
name= "a"
case x,ok= <-b:
name= "b"
}
if!ok{ // 如果任一通道关闭,则终止接收
return
}
println(name,x) // 输出接收的数据信息
}
}()
go func() { // 发送端 defer wg.Done() defer close(a) defer close(b)
for i:=0;i<10;i++ {
select{ // 随机选择发送channel
case a<-i:
case b<-i*10:
}
}
}()
wg.Wait() }
输出:
b 0 a 1 a 2 b 30 a 4 a 5 b 60 b 70 a 8 b 90
如要等全部通道消息处理结束(closed),可将已完成通道设置为nil。这样它就会被阻塞,不再被select选中。
func main() { var wg sync.WaitGroup wg.Add(3)
a,b:=make(chan int),make(chan int)
go func() { // 接收端 defer wg.Done()
for{
select{
case x,ok:= <-a:
if!ok{ // 如果通道关闭,则设置为nil,阻塞
a=nil
break
}
println("a",x)
case x,ok:= <-b:
if!ok{
b=nil
break
}
println("b",x)
}
if a==nil&&b==nil{ // 全部结束,退出循环
return
}
}
}()
go func() { // 发送端a defer wg.Done() defer close(a)
for i:=0;i<3;i++ {
a<-i
}
}()
go func() { // 发送端b defer wg.Done() defer close(b)
for i:=0;i<5;i++ {
b<-i*10
}
}()
wg.Wait() }
输出:
b 0 b 10 b 20 b 30 b 40 a 0 a 1 a 2
即便是同一通道,也会随机选择case执行。
func main() { var wg sync.WaitGroup wg.Add(2)
c:=make(chan int)
go func() { // 接收端 defer wg.Done()
for{
var v int
var ok bool
select{ // 随机选择case
case v,ok= <-c:
println("a1:",v)
case v,ok= <-c:
println("a2:",v)
}
if!ok{
return
}
}
}()
go func() { // 发送端 defer wg.Done() defer close(c)
for i:=0;i<10;i++ {
select{ // 随机选择case
case c<-i:
case c<-i*10:
}
}
}()
wg.Wait() }
输出:
a1:0 a2:10 a2:2 a1:30 a1:40 a2:50 a2:60 a2:7 a1:8 a1:90 a1:0
当所有通道都不可用时,select会执行default语句。如此可避开select阻塞,但须注意处理外层循环,以免陷入空耗。
func main() { done:=make(chan struct{}) c:=make(chan int)
go func() { defer close(done)
for{
select{
case x,ok:= <-c:
if!ok{
return
}
fmt.Println("data:",x)
default: // 避免select阻塞
}
fmt.Println(time.Now())
time.Sleep(time.Second)
}
}()
time.Sleep(time.Second*5)
c←100 close(c)
<-done
}
输出:
2016-04-01 17:22:07 2016-04-01 17:22:08 2016-04-01 17:22:09 2016-04-01 17:22:10 2016-04-01 17:22:11 data:100 2016-04-01 17:22:12
也可用default处理一些默认逻辑。
func main() { done:=make(chan struct{})
data:= []chan int{ // 数据缓冲区 make(chan int,3), }
go func() { defer close(done)
for i:=0;i<10;i++ {
select{
case data[len(data)-1] <-i: // 生产数据
default: // 当前通道已满,生成新的缓存通道
data=append(data,make(chan int,3))
}
}
}()
<-done
for i:=0;i<len(data);i++ { // 显示所有数据 c:=data[i] close(c)
for x:=range c{
println(x)
}
}
}
模式
通常使用工厂方法将goroutine和通道绑定。
type receiver struct{ sync.WaitGroup data chan int }
func newReceiver() *receiver{ r:= &receiver{ data:make(chan int), }
r.Add(1) go func() { defer r.Done() for x:=range r.data{ // 接收消息,直到通道被关闭 println(“recv:“,x) } }()
return r }
func main() { r:=newReceiver() r.data←1 r.data←2
close(r.data) // 关闭通道,发出结束通知 r.Wait() // 等待接收者处理结束 }
输出:
recv:1 recv:2
鉴于通道本身就是一个并发安全的队列,可用作ID generator、Pool等用途。
type pool chan[]byte
func newPool(cap int)pool{ return make(chan[]byte,cap) }
func(p pool)get() []byte{ var v[]byte
select{ case v= ←p: // 返回 default: // 返回失败,新建 v=make([]byte,10) }
return v }
func(p pool)put(b[]byte) { select{ case p←b: // 放回 default: // 放回失败,放弃 } }
用通道实现信号量(semaphore)。
func main() { runtime.GOMAXPROCS(4) var wg sync.WaitGroup
sem:=make(chan struct{},2) // 最多允许2个并发同时执行
for i:=0;i<5;i++ { wg.Add(1)
go func(id int) {
defer wg.Done()
sem<-struct{}{} //acquire: 获取信号
defer func() { <-sem}() //release: 释放信号
time.Sleep(time.Second*2)
fmt.Println(id,time.Now())
}(i)
}
wg.Wait() }
输出:
4 2016-02-19 18:24:09 0 2016-02-19 18:24:09
2 2016-02-19 18:24:11 1 2016-02-19 18:24:11
3 2016-02-19 18:24:13
标准库time提供了timeout和tick channel实现。
func main() { go func() { for{ select{ case←time.After(time.Second*5): fmt.Println(“timeout…”) os.Exit(0) } } }()
go func() { tick:=time.Tick(time.Second)
for{
select{
case<-tick:
fmt.Println(time.Now())
}
}
}()
<-(chan struct{})(nil) // 直接用nil channel阻塞进程
}
捕获INT、TERM信号,顺便实现一个简易的atexit函数。
import( “os” “os/signal” “sync” “syscall” )
var exits= &struct{ sync.RWMutex funcs []func() signals chan os.Signal }{}
func atexit(f func()) { exits.Lock() defer exits.Unlock() exits.funcs=append(exits.funcs,f) }
func waitExit() { if exits.signals==nil{ exits.signals=make(chan os.Signal) signal.Notify(exits.signals,syscall.SIGINT,syscall.SIGTERM) }
exits.RLock() for_,f:=range exits.funcs{ defer f() // 即便某些函数panic,延迟调用也能确保后续函数执行 } // 延迟调用按FILO顺序执行 exits.RUnlock()
<-exits.signals
}
func main() { atexit(func() {println(“exit1…“) }) atexit(func() {println(“exit2…“) })
waitExit() }
性能
将发往通道的数据打包,减少传输次数,可有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单次获取更多数据(批处理),可改善因频繁加锁造成的性能问题。
写个例子测试一下(代码中已尽可能规避额外开销)。
const( max =50000000 // 数据统计上限 block =500 // 数据块大小 bufsize=100 // 缓冲区大小 )
func test() { // 普通模式: 每次传递一个整数 done:=make(chan struct{}) c:=make(chan int,bufsize)
go func() { count:=0 for x:=range c{ count+=x }
close(done)
}()
for i:=0;i<max;i++ { c←i }
close(c) ←done
}
func testBlock() { // 块模式: 每次将500个数字打包成块传输 done:=make(chan struct{}) c:=make(chan[block]int,bufsize)
go func() { count:=0 for a:=range c{ for_,x:=range a{ count+=x } }
close(done)
}()
for i:=0;i<max;i+=block{ var b[block]int // 使用数组对数据打包 for n:=0;n<block;n++ { b[n] =i+n if i+n==max-1{ break } }
c<-b
}
close(c) ←done }
输出:
BenchmarkTest-4 1 4299047783 ns/op 3296 B/op 8 allocs/op BenchmarkTestBlock-4 10 122825583 ns/op 401516 B/op 2 allocs/op
虽然单次消耗更多内存,但性能提升非常明显。如将数组改成切片会造成更多内存分配次数。
资源泄漏
通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接收阻塞状态,但一直未被唤醒。垃圾回收器并不收集此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。
func test() { c:=make(chan int)
for i:=0;i<10;i++ { go func() { ←c }() } }
func main() { test()
for{ time.Sleep(time.Second) runtime.GC() // 强制垃圾回收 } }
输出:
GODEBUG=“gctrace=1,schedtrace=1000,scheddetail=1” ./test
…
gc 33@33.112s 0%:0.019+0+0.22 ms clock,0.078+0/0/0+0.90 ms cpu,0→0→0 MB,0 MB goal,4 P(forced) SCHED 33204ms:gomaxprocs=4 idleprocs=4 threads=6 spinningthreads=0 idlethreads=4 runqueue=0 gcwaiting=0 nmidlelocked=0… P0:status=0 schedtick=2 syscalltick=33 m=-1 runqsize=0 gfreecnt=0 P1:status=0 schedtick=10 syscalltick=32 m=-1 runqsize=0 gfreecnt=0 P2:status=0 schedtick=1 syscalltick=2 m=-1 runqsize=0 gfreecnt=0 P3:status=0 schedtick=0 syscalltick=0 m=-1 runqsize=0 gfreecnt=0 M5:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M4:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M3:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M2:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M1:p=-1 curg=-1 mallocing=0 throwing=0 preemptoff=locks=1 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 M0:p=-1 curg=14 mallocing=0 throwing=0 preemptoff=locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1 G1:status=4(sleep)m=-1 lockedm=-1 G2:status=4(force gc(idle))m=-1 lockedm=-1 G3:status=4(GC sweep wait)m=-1 lockedm=-1 G4:status=4(chan receive)m=-1 lockedm=-1 G5:status=4(chan receive)m=-1 lockedm=-1 G6:status=4(chan receive)m=-1 lockedm=-1 G7:status=4(chan receive)m=-1 lockedm=-1 G8:status=4(chan receive)m=-1 lockedm=-1 G9:status=4(chan receive)m=-1 lockedm=-1 G10:status=4(chan receive)m=-1 lockedm=-1 G11:status=4(chan receive)m=-1 lockedm=-1 G12:status=4(chan receive)m=-1 lockedm=-1 G13:status=4(chan receive)m=-1 lockedm=-1 G14:status=3(timer goroutine(idle))m=0 lockedm=-1
从监控结果可以看到大量goroutine一直处于chan receive状态,无法结束。